{
return tp == QP_DOUBLE;
}
+static inline int qp_is_bool(qp_types_t tp)
+{
+ return tp == QP_TRUE || tp == QP_FALSE;
+}
static inline int qp_is_raw_term(qp_obj_t * qp_obj)
{
return (qp_obj->tp == QP_RAW &&
#define SIRIDB_MAX_DBNAME_LEN 256 /* 255 + NULL */
#define SIRIDB_SCHEMA 5
#define SIRIDB_FLAG_REINDEXING 1
+#define SIRIDB_FLAG_DROPPED 2
#define DEF_DROP_THRESHOLD 1.0 /* 100% */
#define DEF_SELECT_POINTS_LIMIT 1000000 /* one million */
int siridb_is_db_path(const char * dbpath);
siridb_t * siridb_new(const char * dbpath, int lock_flags);
siridb_t * siridb_get(llist_t * siridb_list, const char * dbname);
+siridb_t * siridb_get_by_qp(llist_t * siridb_list, qp_obj_t * qp_dbname);
void siridb_decref_cb(siridb_t * siridb, void * args);
ssize_t siridb_get_file(char ** buffer, siridb_t * siridb);
int siridb_open_files(siridb_t * siridb);
int siridb_save(siridb_t * siridb);
void siridb__free(siridb_t * siridb);
+void siridb_drop(siridb_t * siridb);
#define siridb_incref(siridb) siridb->ref++
#define siridb_decref(_siridb) if (!--_siridb->ref) siridb__free(_siridb)
BPROTO_ENABLE_BACKUP_MODE, /* empty */
BPROTO_DISABLE_BACKUP_MODE, /* empty */
BPROTO_TEE_PIPE_NAME_UPDATE, /* tee pipe name */
+ BPROTO_DROP_DATABASE, /* empty */
} bproto_client_t;
/*
BPROTO_ACK_ENABLE_BACKUP_MODE, /* empty */
BPROTO_ACK_DISABLE_BACKUP_MODE, /* empty */
BPROTO_RES_GROUPS, /* [[name, series], ...] */
- BPROTO_ACK_TEE_PIPE_NAME /* empty */
+ BPROTO_ACK_TEE_PIPE_NAME, /* empty */
+ BPROTO_ACK_DROP_DATABASE, /* empty */
} bproto_server_t;
SERVICE_NEW_DATABASE_,
SERVICE_NEW_POOL,
SERVICE_NEW_REPLICA,
+ SERVICE_DROP_DATABASE,
SERVICE_GET_VERSION=64,
SERVICE_GET_ACCOUNTS,
SERVICE_GET_DATABASES
int xpath_is_dir(const char * path);
ssize_t xpath_get_content(char ** buffer, const char * fn);
int xpath_get_exec_path(char * path);
+int xpath_rmdir(const char * path);
#endif /* XPATH_H_ */
siridb_t * siridb;
siridb_user_t * user;
- char dbname[qp_dbname->len + 1];
- memcpy(dbname, qp_dbname->via.raw, qp_dbname->len);
- dbname[qp_dbname->len] = 0;
-
char username[qp_username->len + 1];
memcpy(username, qp_username->via.raw, qp_username->len);
username[qp_username->len] = 0;
memcpy(password, qp_password->via.raw, qp_password->len);
password[qp_password->len] = 0;
- if ((siridb = siridb_get(siri.siridb_list, dbname)) == NULL)
+ if ((siridb = siridb_get_by_qp(siri.siridb_list, qp_dbname)) == NULL)
{
log_warning("User authentication request failed: unknown database");
return CPROTO_ERR_AUTH_UNKNOWN_DB;
static siridb_t * siridb__from_dat(const char * dbpath);
static int siridb__read_conf(siridb_t * siridb);
static int siridb__lock(const char * dbpath, int lock_flags);
+static inline int siridb__cmp_db(siridb_t * siridb, qp_obj_t * dbname);
#define READ_DB_EXIT_WITH_ERROR(ERROR_MSG) \
strcpy(err_msg, ERROR_MSG); \
return NULL;
}
+/*
+ * Get a siridb object by qpack name.
+ */
+siridb_t * siridb_get_by_qp(llist_t * siridb_list, qp_obj_t * qp_dbname)
+{
+ assert (qp_dbname->tp == QP_RAW);
+
+ llist_node_t * node = siridb_list->first;
+ siridb_t * siridb;
+
+ while (node != NULL)
+ {
+ siridb = (siridb_t *) node->data;
+ if (qp_dbname->len == strlen(siridb->dbname) &&
+ strncmp(
+ siridb->dbname,
+ (const char *) qp_dbname->via.raw,
+ qp_dbname->len) == 0)
+ {
+ return siridb;
+ }
+ node = node->next;
+ }
+
+ return NULL;
+}
+
+
/*
* Sometimes we need a callback function and cannot use a macro expansion.
*/
}
}
+ uv_mutex_destroy(&siridb->series_mutex);
+ uv_mutex_destroy(&siridb->shards_mutex);
+
+ if (siridb->flags & SIRIDB_FLAG_DROPPED)
+ {
+ xpath_rmdir(siridb->dbpath);
+ }
+
free(siridb->dbpath);
free(siridb->dbname);
free(siridb->time);
+ free(siridb);
+}
- uv_mutex_destroy(&siridb->series_mutex);
- uv_mutex_destroy(&siridb->shards_mutex);
+void siridb_drop(siridb_t * siridb)
+{
+ if (siridb->flags & SIRIDB_FLAG_DROPPED)
+ {
+ return;
+ }
- free(siridb);
+ log_warning("dropping database '%s'", siridb->dbname);
+
+ siridb->flags |= SIRIDB_FLAG_DROPPED;
+
+ uv_mutex_lock(&siri.siridb_mutex);
+
+ (void *) llist_remove(siri.siridb_list, NULL, siridb);
+
+ uv_mutex_unlock(&siri.siridb_mutex);
+
+ if (siridb->replicate != NULL)
+ {
+ siridb_replicate_close(siridb->replicate);
+ }
+
+ if (siridb->reindex != NULL && siridb->reindex->timer != NULL)
+ {
+ siridb_reindex_close(siridb->reindex);
+ }
+
+ if (siridb->groups != NULL)
+ {
+ siridb_groups_destroy(siridb->groups);
+ }
+
+ siridb_decref(siridb);
}
/*
return 0;
}
+static inline int siridb__cmp_db(siridb_t * siridb, qp_obj_t * dbname)
+{
+ size_t len = strlen(siridb->dbname);
+ return (
+ dbname->len == len &&
+ strncmp(siridb->dbname, (const char *) dbname->via.raw, len) == 0
+ );
+}
+
static void on_tee_pipe_name_update(
sirinet_stream_t * client,
sirinet_pkg_t * pkg);
+static void on_drop_database(sirinet_stream_t * client, sirinet_pkg_t * pkg);
static void on_repl_finished(sirinet_stream_t * client, sirinet_pkg_t * pkg);
static void on_query(
sirinet_stream_t * client,
case BPROTO_TEE_PIPE_NAME_UPDATE:
on_tee_pipe_name_update(client, pkg);
break;
+ case BPROTO_DROP_DATABASE:
+ on_drop_database(client, pkg);
+ break;
}
}
}
}
+static void on_drop_database(sirinet_stream_t * client, sirinet_pkg_t * pkg)
+{
+ SERVER_CHECK_AUTHENTICATED(client, server)
+
+ siridb_t * siridb = client->siridb;
+ sirinet_pkg_t * package = NULL;
+
+ siridb_drop(siridb);
+
+ package = sirinet_pkg_new(pkg->pid, 0, BPROTO_ACK_DROP_DATABASE, NULL);
+ if (package != NULL)
+ {
+ sirinet_pkg_send(client, package);
+ }
+}
+
static void on_repl_finished(sirinet_stream_t * client, sirinet_pkg_t * pkg)
{
SERVER_CHECK_AUTHENTICATED(client, server)
case BPROTO_ENABLE_BACKUP_MODE: return "BPROTO_ENABLE_BACKUP_MODE";
case BPROTO_DISABLE_BACKUP_MODE: return "BPROTO_DISABLE_BACKUP_MODE";
case BPROTO_TEE_PIPE_NAME_UPDATE: return "BPROTO_TEE_PIPE_NAME_UPDATE";
+ case BPROTO_DROP_DATABASE: return "BPROTO_DROP_DATABASE";
default:
sprintf(protocol_str, "BPROTO_CLIENT_TYPE_UNKNOWN (%d)", n);
return protocol_str;
case BPROTO_ACK_DISABLE_BACKUP_MODE: return "BPROTO_ACK_DISABLE_BACKUP_MODE";
case BPROTO_RES_GROUPS: return "BPROTO_RES_GROUPS";
case BPROTO_ACK_TEE_PIPE_NAME: return "BPROTO_ACK_TEE_PIPE_NAME";
+ case BPROTO_ACK_DROP_DATABASE: return "BPROTO_ACK_DROP_DATABASE";
default:
sprintf(protocol_str, "BPROTO_SERVER_TYPE_UNKNOWN (%d)", n);
return protocol_str;
*/
#define PCRE2_CODE_UNIT_WIDTH 8
+#include <lock/lock.h>
+#include <logger/logger.h>
+#include <pcre2.h>
+#include <siri/db/buffer.h>
+#include <siri/db/reindex.h>
+#include <siri/db/server.h>
+#include <siri/db/servers.h>
#include <siri/service/account.h>
#include <siri/service/client.h>
-#include <stddef.h>
#include <siri/service/request.h>
#include <siri/siri.h>
-#include <logger/logger.h>
-#include <pcre2.h>
-#include <lock/lock.h>
-#include <xmath/xmath.h>
+#include <siri/version.h>
+#include <stddef.h>
#include <unistd.h>
#include <uuid/uuid.h>
-#include <siri/db/server.h>
-#include <siri/db/buffer.h>
-#include <siri/version.h>
-#include <siri/db/reindex.h>
+#include <xmath/xmath.h>
#define DEFAULT_TIME_PRECISION 1
#define DEFAULT_BUFFER_SIZE 1024
qp_unpacker_t * qp_unpacker,
qp_obj_t * qp_account,
char * err_msg);
+static cproto_server_t SERVICE_on_drop_database(
+ qp_unpacker_t * qp_unpacker,
+ char * err_msg);
static cproto_server_t SERVICE_on_new_database(
qp_unpacker_t * qp_unpacker,
char * err_msg);
static int SERVICE_list_accounts(
siri_service_account_t * account,
qp_packer_t * packer);
+static void SERVICE_on_drop_database_cb(vec_t *, void *);
static size_t max_filename_sz;
client,
SERVICE_NEW_REPLICA,
err_msg);
+ case SERVICE_DROP_DATABASE:
+ return SERVICE_on_drop_database(qp_unpacker, err_msg);
case SERVICE_GET_VERSION:
return SERVICE_on_get_version(qp_unpacker, packaddr, err_msg);
case SERVICE_GET_ACCOUNTS:
CPROTO_ERR_SERVICE : CPROTO_ACK_SERVICE;
}
+/*
+ * Returns CPROTO_ACK_SERVICE when successful.
+ * In case of an error CPROTO_ERR_SERVICE can be returned in which case err_msg
+ * is set, or CPROTO_ERR_SERVICE_INVALID_REQUEST is returned.
+ */
+static cproto_server_t SERVICE_on_drop_database(
+ qp_unpacker_t * qp_unpacker,
+ char * err_msg)
+{
+ _Bool ignore_offline;
+ siridb_t * siridb;
+ qp_obj_t qp_key, qp_target, qp_ignore_offline;
+ sirinet_pkg_t * pkg;
+ vec_t * servers;
+
+ qp_target.tp = QP_HOOK;
+ qp_ignore_offline.tp = QP_HOOK;
+
+ if (!qp_is_map(qp_next(qp_unpacker, NULL)))
+ {
+ return CPROTO_ERR_SERVICE_INVALID_REQUEST;
+ }
+
+ while (qp_next(qp_unpacker, &qp_key) == QP_RAW)
+ {
+ if ( strncmp(
+ (const char *) qp_key.via.raw,
+ "database",
+ qp_key.len) == 0 &&
+ qp_next(qp_unpacker, &qp_target) == QP_RAW)
+ {
+ continue;
+ }
+
+ if ( strncmp(
+ (const char *) qp_key.via.raw,
+ "ignore_offline",
+ qp_key.len) == 0 &&
+ qp_is_bool(qp_next(qp_unpacker, &qp_ignore_offline)))
+ {
+ continue;
+ }
+ return CPROTO_ERR_SERVICE_INVALID_REQUEST;
+ }
+
+ if (qp_target.tp == QP_HOOK)
+ {
+ return CPROTO_ERR_SERVICE_INVALID_REQUEST;
+ }
+
+ ignore_offline = (
+ qp_ignore_offline.tp != QP_HOOK &&
+ qp_ignore_offline.tp == QP_TRUE
+ );
+
+ siridb = siridb_get_by_qp(siri.siridb_list, &qp_target);
+ if (siridb == NULL)
+ {
+ sprintf(err_msg, "cannot find database '%.*s'",
+ (int) qp_target.len, (char *) qp_target.via.raw);
+ return CPROTO_ERR_SERVICE;
+ }
+
+ if (!ignore_offline && !siridb_servers_online(siridb))
+ {
+ sprintf(err_msg,
+ "at least one server is offline, "
+ "set `ignore_offline` to true if you want to "
+ "ignore offline servers");
+ return CPROTO_ERR_SERVICE;
+ }
+
+ pkg = sirinet_pkg_new(0, 0, BPROTO_DROP_DATABASE, NULL);
+ servers = siridb_servers_other2vec(siridb);
+ if (pkg == NULL || servers == NULL)
+ {
+ free(pkg);
+ vec_free(servers);
+ sprintf(err_msg, "memory allocation error");
+ return CPROTO_ERR_SERVICE;
+ }
+
+ siridb_servers_send_pkg(
+ servers,
+ pkg,
+ 0,
+ SERVICE_on_drop_database_cb,
+ NULL);
+
+ siridb_drop(siridb);
+ vec_free(servers);
+
+ return CPROTO_ACK_SERVICE;
+}
+
/*
* Returns CPROTO_ACK_SERVICE when successful.
* In case of an error CPROTO_ERR_SERVICE can be returned in which case err_msg
{
return qp_add_string(packer, account->account);
}
+
+static void SERVICE_on_drop_database_cb(
+ vec_t * promises,
+ void * data __attribute__((unused)))
+{
+ log_debug("drop database has been send to all online servers");
+
+ if (promises != NULL)
+ {
+ size_t i;
+ sirinet_promise_t * promise;
+ for (i = 0; i < promises->len; i++)
+ {
+ promise = promises->data[i];
+ if (promise != NULL)
+ {
+ free(promise->data);
+ sirinet_promise_decref(promise);
+ }
+ }
+ }
+}
#include <string.h>
#include <sys/stat.h>
#include <unistd.h>
+#include <dirent.h>
#include <xpath/xpath.h>
/*
return 0;
}
+
+int xpath_rmdir(const char * path)
+{
+ DIR * d = opendir(path);
+ if (!d)
+ return -1;
+
+ size_t bufsz = 0, path_len = strlen(path);
+ const char * slash = (path[path_len - 1] == '/') ? "" : "/";
+ struct dirent * p;
+ char * buf = NULL;
+
+ while ((p = readdir(d)))
+ {
+ size_t len;
+
+ /* Skip the names "." and ".." as we don't want to recurse on them. */
+ if (!strcmp(p->d_name, ".") || !strcmp(p->d_name, ".."))
+ continue;
+
+ len = path_len + strlen(p->d_name) + 2;
+ if (len > bufsz)
+ {
+ bufsz = len;
+ char * tmp = realloc(buf, bufsz);
+ if (!tmp) goto stop;
+ buf = tmp;
+ }
+
+ snprintf(buf, len, "%s%s%s", path, slash, p->d_name);
+
+ if (xpath_is_dir(buf) ? xpath_rmdir(buf) : unlink(buf))
+ goto stop;
+ }
+
+stop:
+ free(buf);
+ closedir(d);
+
+ return rmdir(path);
+}